热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

SparkSQL|窗口函数

窗口函数的定义引用一个大佬的定义:awindowfunctioncalculatesareturnvalueforeveryinputrowofatablebasedonagrou

窗口函数的定义引用一个大佬的定义: a window function calculates a return value for every input row of a table based on a group of rows。窗口函数与与其他函数的区别:



  • 普通函数: 作用于每一条记录,计算出一个新列(记录数不变);

  • 聚合函数: 作用于一组记录(全部数据按照某种方式分为多组),计算出一个聚合值(记录数变小);

  • 窗口函数: 作用于每一条记录,逐条记录去指定多条记录来计算一个值(记录数不变)。

窗口函数语法结构: 函数名(参数)OVER(PARTITION BY 子句 ORDER BY 子句 ROWS/RANGE子句)



  • 函数名:

  • OVER: 关键字,说明这是窗口函数,不是普通的聚合函数;

  • 子句

    • PARTITION BY: 分组字段

    • ORDER BY: 排序字段

    • ROWS/RANG窗口子句: 用于控制窗口的尺寸边界,有两种(ROW,RANGE)

      • ROW: 物理窗口,数据筛选基于排序后的index

      • RANGE: 逻辑窗口,数据筛选基于值





主要有以下三种窗口函数



  • ranking functions

  • analytic functions

  • aggregate functions


数据加载

from pyspark.sql.types import *schema = StructType().add('name', StringType(), True).add('department', StringType(), True).add('salary', IntegerType(), True)
df = spark.createDataFrame([("Tom", "Sales", 4500),("Georgi", "Sales", 4200),("Kyoichi", "Sales", 3000), ("Berni", "Sales", 4700),("Guoxiang", "Sales", 4200), ("Parto", "Finance", 2700),("Anneke", "Finance", 3300),("Sumant", "Finance", 3900),("Jeff", "Marketing", 3100),("Patricio", "Marketing", 2500)
], schema=schema)
df.createOrReplaceTempView('salary')
df.show()

+--------+----------+------+
| name|department|salary|
+--------+----------+------+
| Tom| Sales| 4500|
| Georgi| Sales| 4200|
| Kyoichi| Sales| 3000|
| Berni| Sales| 4700|
|Guoxiang| Sales| 4200|
| Parto| Finance| 2700|
| Anneke| Finance| 3300|
| Sumant| Finance| 3900|
| Jeff| Marketing| 3100|
|Patricio| Marketing| 2500|
+--------+----------+------+

ranking functions




































sqlDataFrame功能
row_numberrowNumber从1~n的唯一序号值
rankrank与denseRank一样,都是排名,对于相同的数值,排名一致。区别:rank不会跳过并列的排名
dense_rankdenseRank同rank
percent_rankpercentRank计算公式: (组内排名-1)/(组内行数-1),如果组内只有1行,则结果为0
ntilentile将组内数据排序后,按照指定的n切分为n个桶,该值为当前行的桶号(桶号从1开始)

spark.sql("""
SELECTname ,department,salary,row_number() over(partition by department order by salary) as index,rank() over(partition by department order by salary) as rank,dense_rank() over(partition by department order by salary) as dense_rank,percent_rank() over(partition by department order by salary) as percent_rank,ntile(2) over(partition by department order by salary) as ntile
FROM salary
"""
).toPandas()




namedepartmentsalaryindexrankdense_rankpercent_rankntile
0PatricioMarketing25001110.001
1JeffMarketing31002221.002
2KyoichiSales30001110.001
3GeorgiSales42002220.251
4GuoxiangSales42003220.251
5TomSales45004430.752
6BerniSales47005541.002
7PartoFinance27001110.001
8AnnekeFinance33002220.501
9SumantFinance39003331.002

analytic functions

























sqlDataFrame功能
cume_distcumeDist计算公式: 组内小于等于值当前行数/组内总行数
laglaglag(input, [offset,[default]]) 当前index
leadlead与lag相反

spark.sql("""
SELECTname ,department,salary,row_number() over(partition by department order by salary) as index,cume_dist() over(partition by department order by salary) as cume_dist,lag('salary', 2) over(partition by department order by salary) as lag,lead('salary', 2) over(partition by department order by salary) as lead FROM salary
"""
).toPandas()




namedepartmentsalaryindexcume_distlaglead
0PatricioMarketing250010.500000NoneNone
1JeffMarketing310021.000000NoneNone
2KyoichiSales300010.200000Nonesalary
3GeorgiSales420020.600000Nonesalary
4GuoxiangSales420030.600000salarysalary
5TomSales450040.800000salaryNone
6BerniSales470051.000000salaryNone
7PartoFinance270010.333333Nonesalary
8AnnekeFinance330020.666667NoneNone
9SumantFinance390031.000000salaryNone

aggregate functions

只是在一定窗口里实现一些普通的聚合函数。


























sql功能
avg平均值
sum求和
min最小值
max最大值

spark.sql("""
SELECTname ,department,salary,row_number() over(partition by department order by salary) as index,sum(salary) over(partition by department order by salary) as sum,avg(salary) over(partition by department order by salary) as avg,min(salary) over(partition by department order by salary) as min,max(salary) over(partition by department order by salary) as max
FROM salary
"""
).toPandas()




namedepartmentsalaryindexsumavgminmax
0PatricioMarketing2500125002500.025002500
1JeffMarketing3100256002800.025003100
2KyoichiSales3000130003000.030003000
3GeorgiSales42002114003800.030004200
4GuoxiangSales42003114003800.030004200
5TomSales45004159003975.030004500
6BerniSales47005206004120.030004700
7PartoFinance2700127002700.027002700
8AnnekeFinance3300260003000.027003300
9SumantFinance3900399003300.027003900

窗口子句

ROWS/RANG窗口子句: 用于控制窗口的尺寸边界,有两种(ROW,RANGE)



  • ROWS: 物理窗口,数据筛选基于排序后的index

  • RANGE: 逻辑窗口,数据筛选基于值

语法:OVER (PARTITION BY … ORDER BY … frame_type BETWEEN start AND end)

有以下5种边界



  • CURRENT ROW:

  • UNBOUNDED PRECEDING: 分区第一行

  • UNBOUNDED FOLLOWING: 分区最后一行

  • n PRECEDING: 前n行

  • n FOLLOWING: 后n行

  • UNBOUNDED: 起点

spark.sql("""
SELECTname ,department,salary,row_number() over(partition by department order by salary) as index,row_number() over(partition by department order by salary rows between UNBOUNDED PRECEDING and CURRENT ROW) as index1
FROM salary
"""
).toPandas()




namedepartmentsalaryindexindex1
0PatricioMarketing250011
1JeffMarketing310022
2KyoichiSales300011
3GeorgiSales420022
4GuoxiangSales420033
5TomSales450044
6BerniSales470055
7PartoFinance270011
8AnnekeFinance330022
9SumantFinance390033

混合应用

spark.sql("""
SELECTname ,department,salary,row_number() over(partition by department order by salary) as index,salary - (min(salary) over(partition by department order by salary)) as salary_diff
FROM salary
"""
).toPandas()




namedepartmentsalaryindexsalary_diff
0PatricioMarketing250010
1JeffMarketing31002600
2KyoichiSales300010
3GeorgiSales420021200
4GuoxiangSales420031200
5TomSales450041500
6BerniSales470051700
7PartoFinance270010
8AnnekeFinance33002600
9SumantFinance390031200

参考



  • Introducing Window Functions in Spark SQL

  • Standard Functions for Window Aggregation (Window Functions

  • List Of Spark SQL Window Functions

  • 在hive、Spark SQL中引入窗口函数



推荐阅读
author-avatar
高振Andy
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有